package com.chb.userbehavioranalysis.order

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// 定义输入订单事件的样例类
case class OrderEvent(orderId: Long,      // 订单Id
                      eventType: String, // 时间类型
                      txId: String,   // 交易Id
                      eventTime: Long) // 时间时间

// 定义输出结果样例类
case class OrderResult(orderId: Long, resultMsg: String)

/**
 * 订单超时
 */
object OrderTimeout {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        // 1. 读取订单数据
        val resource = getClass.getResource("/OrderLog.csv")
        val orderEventStream = env.readTextFile(resource.getPath)
            // val orderEventStream = env.socketTextStream("chb1", 8888)
            .map(data => {
                val dataArray = data.split(",")
                OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
            })
            .assignAscendingTimestamps(_.eventTime * 1000L)
            .keyBy(_.orderId)

        // 2. 定义一个匹配模式  // create -modify-> pay
        val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.eventType == "create")
            .followedBy("follow").where(_.eventType == "pay")  // 不一定要严格紧邻， 可以再超时时间范围内支付就可以
            .within(Time.minutes(15))

        // 3. 把模式应用到stream上，得到一个pattern stream
        val patternStream = CEP.pattern(orderEventStream, orderPayPattern)

        // 4. 调用select方法，提取事件序列，超时的事件要做报警提示
        val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")

        val resultStream = patternStream.select(orderTimeoutOutputTag,
            new OrderTimeoutSelect(),
            new OrderPaySelect())

        resultStream.print("payed") // 主流
        resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout") // 通过主流，getSideOutput获取测流

        env.execute("order timeout job")
    }
}

// 自定义超时事件序列处理函数
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult] {
    override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
        // 只能检测到begin
        val timeoutOrderId = map.get("begin").iterator().next().orderId
        OrderResult(timeoutOrderId, "timeout")
    }
}

// 自定义正常支付事件序列处理函数
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult] {
    override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
        // begin -->> xxx --> pay, 说明支付成功
        val payedOrderId = map.get("follow").iterator().next().orderId
        OrderResult(payedOrderId, "payed successfully")
    }
}
